本文基于Rxjava 2.x版本,介绍用于创建 Observable 对象的操作符。
Operators that originate new Observables.
Create — create an Observable from scratch by calling observer methods programmatically
Defer — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
Empty/Never/Throw — create Observables that have very precise and limited behavior
From — convert some other object or data structure into an Observable
Interval — create an Observable that emits a sequence of integers spaced by a particular time interval
Just — convert an object or a set of objects into an Observable that emits that or those objects
Range — create an Observable that emits a range of sequential integers
Repeat — create an Observable that emits a particular item or sequence of items repeatedly
Start — create an Observable that emits the return value of a function
Timer — create an Observable that emits a single item after a given delay
create 操作符 通过 create
方法构造 Observable 对象,通过被观察者的 subcribe
方法建立起观察者与被观察者的联系。
create example
1 2 3 4 5 6 7 8 9 Observable<String> observable = Observable.create(emitter -> { emitter.onNext("Hello" ); emitter.onNext("World" ); emitter.onComplete(); emitter.onError(new NullPointerException()); }); observable.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("onComplete" ));
defer 操作符 一个允许延迟操作的操作符,直到 ObservableSource 订阅了观察者 Observer,发射源 emitting item 才会发射数据。defer
会为每一个 Observer 观察者对象创建新的 Observable ,所以下面两次打印数据是不同的。而 create
操作符观察者对象无论被订阅多少次,数据都是相同的。
defer example
1 2 3 4 5 6 7 8 9 Observable<Long> observable = Observable.defer(() -> { long time = System.currentTimeMillis(); return Observable.just(time); }); observable.subscribe(time -> System.out.println(time)); Thread.sleep(1000 ); observable.subscribe(time -> System.out.println(time));
empty 操作符 这个操作符会生成一个没有发射数据的 Observable 对象,只会且直接调用 Observer#onComplete 方法。
empty example
1 2 3 4 5 6 7 Observable<String> empty = Observable.empty(); empty.subscribe( v -> System.out.println("This should never be printed!" ), error -> System.out.println("Or this!" ), () -> System.out.println("Done will be printed." ));
never 操作符 never
操作符不会调用观察者对象的 onNext、onComplete 或者 onError 方法,它主要用来测试或者禁用组合操作符中的某些 Observable 对象。
never example
1 2 3 4 5 6 Observable<String> never = Observable.never(); never.subscribe( v -> System.out.println("This should never be printed!" ), error -> System.out.println("Or this!" ), () -> System.out.println("This neither!" ));
error 操作符 error
操作符只会调用 Observer#error 方法
,常见 error
使用场景:
error example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Observable<String> results = Observable.fromCallable((Callable<String>) () -> { if (Math.random() < 0.5 ) { throw new IOException(); } throw new IllegalArgumentException(); }).onErrorResumeNext(error -> { if (error instanceof IllegalArgumentException) { return Observable.empty(); } return Observable.error(error); }); for (int i = 0 ; i < 10 ; i++) { results.subscribe( v -> System.out.println("This should never be printed!" ), error -> error.printStackTrace(), () -> System.out.println("Done" )); }
from 操作符 fromIterable 操作符 根据 Iterable (类似 List,Set,Collection 等) 对象创建可观察者模型。
fromIterable example
1 2 3 4 5 6 List<Integer> list = new ArrayList<>(Arrays.asList(1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 )); Observable<Integer> observable = Observable.fromIterable(list); observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(), () -> System.out.println("Done" ));
fromArray 操作符 根据数组创建可观察者模型。
1 2 3 4 5 6 7 8 9 Integer[] array = new Integer[10 ]; for (int i = 0 ; i < array.length; i++) { array[i] = i; } Observable<Integer> observable = Observable.fromArray(array); observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(), () -> System.out.println("Done" ));
fromCallable 操作符 当订阅事件发生,Callable中的方法会被调用,返回值会转发至观察者。
1 2 3 4 5 6 7 8 9 10 Callable<String> callable = () -> { System.out.println("Hello World!" ); return "Hello World!" ; }; Observable<String> observable = Observable.fromCallable(callable); observable.subscribe(item -> System.out.println(item), error -> error.printStackTrace(), () -> System.out.println("Done" ));
fromAction 操作符 //待验证…
1 2 3 4 5 Action action = () -> System.out.println("Hello World!" ); Completable completable = Completable.fromAction(action); completable.subscribe(() -> System.out.println("Done" ), error -> error.printStackTrace());
fromRunnable 操作符 //待验证…
1 2 3 4 5 Runnable runnable = () -> System.out.println("Hello World!"); Completable completable = Completable.fromRunnable(runnable); completable.subscribe(() -> System.out.println("Done"), error -> error.printStackTrace());
fromFuture 操作符 给定预先存在的,已经运行或已经完成的java.util.concurrent.Future,等待Future正常完成或以阻塞方式使用异常并将生成的值或异常转发给使用者。
1 2 3 4 5 6 7 8 9 10 11 12 ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); Future<String> future = executor.schedule(() -> "Hello world!" , 1 , TimeUnit.SECONDS); Observable<String> observable = Observable.fromFuture(future); observable.subscribe( item -> System.out.println(item), error -> error.printStackTrace(), () -> System.out.println("Done" )); executor.shutdown();
from{reactive type} 操作符 //待验证…
1 2 3 4 5 6 7 8 Flux<Integer> reactorFlux = Flux.fromCompletionStage(CompletableFuture.<Integer>completedFuture(1 )); Observable<Integer> observable = Observable.fromPublisher(reactorFlux); observable.subscribe( item -> System.out.println(item), error -> error.printStackTrace(), () -> System.out.println("Done" ));
generate 操作符 //待验证…
1 2 3 4 5 6 7 8 int startValue = 1 ;int incrementValue = 1 ;Flowable<Integer> flowable = Flowable.generate(() -> startValue, (s, emitter) -> { int nextValue = s + incrementValue; emitter.onNext(nextValue); return nextValue; }); flowable.subscribe(value -> System.out.println(value));
interval 操作符 interval
操作符指定定时发送发出数据的时间间隔,interval 有多个重载方法,最终都会调用下面的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @CheckReturnValue @NonNull @SchedulerSupport (SchedulerSupport.CUSTOM)public static Observable<Long> interval (long initialDelay, long period, TimeUnit unit, Scheduler scheduler) { ObjectHelper.requireNonNull(unit, "unit is null" ); ObjectHelper.requireNonNull(scheduler, "scheduler is null" ); return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L , initialDelay), Math.max(0L , period), unit, scheduler)); }
interval()
的其它重载方法默认不设置 initialDleay
值时,initialDelay
的值将和 period
值保持一致,默认的调度器 Schedulers.computation()
。在 java 中使用默认调度器的时候,interval 方法不会正常执行,程序会自动退出。interval 方法指定线程是未阻塞的,并不会阻止 JVM 退出程序。指定调度器 trampoline()
延长程序存活时间,interval 方法会正常执行。在Android中程序是在活跃的,不需要指定调度器。问题详情见 Rxjava issue
interval example
1 2 3 4 5 6 7 8 Observable<Long> clock = Observable.interval(1 , TimeUnit.SECONDS, Schedulers.trampoline()); clock.subscribe(time -> { if (time % 2 == 0 ) { System.out.println("Tick" ); } else { System.out.println("Tock" ); } });
just 操作符 根据 just
方法中顺序依次发射数据到下游观察者对象,just
方法可定义 1~9
个参数,但参数类型应保持一致。
just example
1 2 3 4 5 Observable<Object> observable = Observable.just("1" , "A" , "3.2" , "def" ); observable.subscribe(item -> System.out.print(item), error -> error.printStackTrace(), () -> System.out.println());
range 操作符 发出指定范围的整数队列值,调用的是 onNext 方法。
range example
1 2 3 4 5 6 7 8 9 String greeting = "Hello World!" ; Observable<Integer> indexes = Observable.range(0 , greeting.length()); Observable<Character> characters = indexes .map(index -> greeting.charAt(index)); characters.subscribe(character -> System.out.print(character), error -> error.printStackTrace(), () -> System.out.println());
timer 操作符 timer(long, TimeUnit)
方法指定延迟时间发射出指定数据,不重复发出。
timer example
1 2 3 Observable<Long> eggTimer = Observable.timer(5, TimeUnit.MINUTES); eggTimer.blockingSubscribe(v -> System.out.println("Egg is ready!"));
参考文章:
https://github.com/ReactiveX/RxJava/wiki/Creating-Observables